fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent#22523
Conversation
`FilterPushdown::new_post_optimization()` was ANDing a fresh `DynamicFilterPhysicalExpr` onto the probe-side scan's predicate on every invocation. After N passes the probe-side data source carried `DynamicFilter AND DynamicFilter AND ...` (N copies of the same empty expression). Root cause: `HashJoinExec::gather_filters_for_pushdown` always created a new dynamic filter in the Post phase, regardless of whether the join already had one from a prior pass. The previous-pass filter is retained on the `HashJoinExec` itself (the `dynamic_filter` field is preserved through `with_new_children` via the builder) and the same `Arc<DynamicFilterPhysicalExpr>` is wired into the probe-side scan's predicate, so a new one is redundant and stacks duplicates. Fix: skip dynamic filter creation when `self.dynamic_filter.is_some()`. The existing shared `Arc` keeps the probe-side scan correctly wired to the build-side accumulator that will populate it. Motivation: adaptive execution in datafusion-ballista AQE (apache/datafusion-ballista#1359) re-runs the entire `PhysicalOptimizer` chain after every completed stage. Unlike `OutputRequirements` (whose duplicate wrappers are masked by `new_remove_mode` later in the chain), this duplication survives to the executed plan and degrades scan performance with redundant filter evaluation. Adds `post_phase_is_idempotent_on_hash_join` to `datafusion/core/tests/physical_optimizer/filter_pushdown.rs`: builds a hash join over two parquet scans, invokes the rule twice, asserts the plan strings match. Fails before this fix (two `AND DynamicFilter` clauses on the probe side); passes after. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
milenkovicm
left a comment
There was a problem hiding this comment.
thanks for your contribution @wirybeaver
I'll have to involve @adriangb for this as he's original creator,
| if phase == FilterPushdownPhase::Post | ||
| && self.dynamic_filter.is_none() |
There was a problem hiding this comment.
Unfortunately i dont know much about this peace of code, this might provide correct result but i'm not sure if this is correct way to do it
|
@adriangb or @gabotechs would be great if you could have a look, you'd know this code much better than me. thanks a lot |
adriangb
left a comment
There was a problem hiding this comment.
This seems like a reasonable way to do this 👍
Do we have the same situation with SortExec
Dynamic filtering seems to be only applied to HashJoinExec. Let me double check this weekend |
|
@wirybeaver not sure if we can move this optimizers to ballista until df 55 is released |
|
I think it's not too late to ask for something minor like this to be ported to 54 if that helps |
|
thats actually good idea |
|
@wirybeaver would you like to backport this PR to 54 release branch? @adriangb has kindly offered to review it. Details could be found at #21080 If you don't get chance to do it I'll do it tomorrow |
|
Ive moved optimizers to ballista until DF55 released, no need to backport for now apache/datafusion-ballista#1806 |
Which issue does this PR close?
Related to apache/datafusion-ballista#1359
Rationale
Ballista's Adaptive Query Execution (AQE) planner re-invokes DataFusion's full
PhysicalOptimizerchain after every completed stage.FilterPushdown::new_post_optimization()is not idempotent on plans containingHashJoinExec.In the
Postphase,HashJoinExec::gather_filters_for_pushdownunconditionally creates a newDynamicFilterPhysicalExprand installs it on the probe-side child viawith_self_filter. After pass 1 the join already carries adynamic_filter: Some(...), and the sharedArc<DynamicFilterPhysicalExpr>is already wired into the probe-side scan's predicate. On pass 2 a second dynamic filter is created and ANDed onto the existing predicate, producingDynamicFilter AND DynamicFilter. Each subsequent pass adds another duplicate, compounding indefinitely in AQE replan loops.What changes are included in this PR?
HashJoinExec::gather_filters_for_pushdown: skip dynamic-filter creation whenself.dynamic_filter.is_some(), meaning a previous pass already installed one. The existingArcremains valid and correctly wired into the probe-side scan.post_phase_is_idempotent_on_hash_joinintests/physical_optimizer/filter_pushdown.rs: builds aHashJoinExec, runsFilterPushdown::new_post_optimization()twice, and asserts structural equality viaget_plan_string.Are these changes tested?
Yes. The new test fails without the fix (plan strings diverge due to duplicated dynamic filter predicates) and passes with it.
Are there any user-facing changes?
No. Dynamic filter pushdown is an internal optimization; the idempotence guard only affects re-optimization scenarios (AQE).